package org.frameworkset.elasticsearch.imp.quartz;

import com.frameworkset.util.SimpleStringUtil;
import org.frameworkset.elasticsearch.serial.SerialUtil;
import org.frameworkset.spi.geoip.IpInfo;
import org.frameworkset.tran.DataRefactor;
import org.frameworkset.tran.ExportResultHandler;
import org.frameworkset.tran.config.ImportBuilder;
import org.frameworkset.tran.context.Context;
import org.frameworkset.tran.metrics.TaskMetrics;
import org.frameworkset.tran.plugin.db.input.DBInputConfig;
import org.frameworkset.tran.plugin.db.output.DBOutputConfig;
import org.frameworkset.tran.schedule.ExternalScheduler;
import org.frameworkset.tran.schedule.ImportIncreamentConfig;
import org.frameworkset.tran.schedule.quartz.BaseQuartzDatasynJob;
import org.frameworkset.tran.task.TaskCommand;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;

import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 原生的数据同步quartz作业调度任务
 */
public class ImportDataJob extends BaseQuartzDatasynJob {

    public void init(){
        externalScheduler = new ExternalScheduler();
        externalScheduler.dataStream((Object params)->{
            JobExecutionContext context = (JobExecutionContext)params;
            JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
            Object data = jobDataMap.get("aa");
            ImportBuilder importBuilder = ImportBuilder.newInstance();



            //指定导入数据的sql语句，必填项，可以设置自己的提取逻辑，
            // 设置增量变量log_id，增量变量名称#[log_id]可以多次出现在sql语句的不同位置中，例如：
            // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]
            // 需要设置setLastValueColumn信息log_id，
            // 通过setLastValueType方法告诉工具增量字段的类型，默认是数字类型


            /**
             * 源db相关配置
             */
            DBInputConfig dbInputConfig = new DBInputConfig();
            dbInputConfig.setDbName("source")
                    .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序，必须导入相关数据库的驱动jar包
                    .setDbUrl("jdbc:mysql://localhost:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false") //通过useCursorFetch=true启用mysql的游标fetch机制，否则会有严重的性能隐患，useCursorFetch必须和jdbcFetchSize参数配合使用，否则不会生效
                    .setDbUser("root")
                    .setDbPassword("123456")
                    .setValidateSQL("select 1")
                    .setUsePool(true)//是否使用连接池
                    .setSqlFilepath("sql.xml")
                    .setSqlName("demoexport");
            importBuilder.setInputConfig(dbInputConfig);

            DBOutputConfig dbOutputConfig = new DBOutputConfig();
            dbOutputConfig.setDbName("target")
                    .setDbDriver("com.mysql.cj.jdbc.Driver") //数据库驱动程序，必须导入相关数据库的驱动jar包
                    .setDbUrl("jdbc:mysql://localhost:3306/bboss?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true") //通过useCursorFetch=true启用mysql的游标fetch机制，否则会有严重的性能隐患，useCursorFetch必须和jdbcFetchSize参数配合使用，否则不会生效
                    .setDbUser("root")
                    .setDbPassword("123456")
                    .setValidateSQL("select 1")
                    .setUsePool(true)//是否使用连接池
                    .setSqlFilepath("sql.xml")
                    .setInsertSqlName("insertSql");
            importBuilder.setOutputConfig(dbOutputConfig);

            importBuilder.setBatchSize(10); //可选项,批量导入db的记录数，默认为-1，逐条处理，> 0时批量处理
            //定时任务配置，
            importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明
                    //					 .setScheduleDate(date) //指定任务开始执行时间：日期
                    .setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行
                    .setPeriod(5000L); //每隔period毫秒执行，如果不设置，只执行一次
            //定时任务配置结束
            //
            //		//设置任务执行拦截器，可以添加多个，定时任务每次执行的拦截器
            //		importBuilder.addCallInterceptor(new CallInterceptor() {
            //			@Override
            //			public void preCall(TaskContext taskContext) {
            //				System.out.println("preCall");
            //			}
            //
            //			@Override
            //			public void afterCall(TaskContext taskContext) {
            //				System.out.println("afterCall");
            //			}
            //
            //			@Override
            //			public void throwException(TaskContext taskContext, Exception e) {
            //				System.out.println("throwException");
            //			}
            //		}).addCallInterceptor(new CallInterceptor() {
            //			@Override
            //			public void preCall(TaskContext taskContext) {
            //				System.out.println("preCall 1");
            //			}
            //
            //			@Override
            //			public void afterCall(TaskContext taskContext) {
            //				System.out.println("afterCall 1");
            //			}
            //
            //			@Override
            //			public void throwException(TaskContext taskContext, Exception e) {
            //				System.out.println("throwException 1");
            //			}
            //		});
            //		//设置任务执行拦截器结束，可以添加多个
            //增量配置开始
            //		importBuilder.setLastValueColumn("log_id");//手动指定数字增量查询字段，默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称，指定以后就用指定的字段
            //		importBuilder.setDateLastValueColumn("log_id");//手动指定日期增量查询字段，默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称，指定以后就用指定的字段
            importBuilder.setFromFirst(true);//setFromfirst(false)，如果作业停了，作业重启后从上次截止位置开始采集数据，
            //setFromfirst(true) 如果作业停了，作业重启后，重新开始采集数据
            importBuilder.setLastValueStorePath("logdb2db_import");//记录上次采集的增量字段值的文件路径，作为下次增量（或者重启后）采集数据的起点，不同的任务这个路径要不一样
            //		importBuilder.setLastValueStoreTableName("logs");//记录上次采集的增量字段值的表，可以不指定，采用默认表名increament_tab
            importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称，则需要指定字段类型：ImportIncreamentConfig.NUMBER_TYPE 数字类型
            // 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型
            //增量配置结束

            //映射和转换配置开始
            //		/**
            //		 * db-es mapping 表字段名称到es 文档字段的映射：比如document_id -> docId
            //		 * 可以配置mapping，也可以不配置，默认基于java 驼峰规则进行db field-es field的映射和转换
            //		 */
            //		importBuilder.addFieldMapping("document_id","docId")
            //				.addFieldMapping("docwtime","docwTime")
            //				.addIgnoreFieldMapping("channel_id");//添加忽略字段
            //
            //
            //		/**
            //		 * 为每条记录添加额外的字段和值
            //		 * 可以为基本数据类型，也可以是复杂的对象
            //		 */
            //		importBuilder.addFieldValue("testF1","f1value");
            //		importBuilder.addFieldValue("testInt",0);
            //		importBuilder.addFieldValue("testDate",new Date());
            //		importBuilder.addFieldValue("testFormateDate","yyyy-MM-dd HH",new Date());
            //		TestObject testObject = new TestObject();
            //		testObject.setId("testid");
            //		testObject.setName("jackson");
            //		importBuilder.addFieldValue("testObject",testObject);
            //
            final AtomicInteger s = new AtomicInteger(0);
            importBuilder.setGeoipDatabase("E:/workspace/hnai/terminal/geolite2/GeoLite2-City.mmdb");
            importBuilder.setGeoipAsnDatabase("E:/workspace/hnai/terminal/geolite2/GeoLite2-ASN.mmdb");
            importBuilder.setGeoip2regionDatabase("E:/workspace/hnai/terminal/geolite2/ip2region.db");
            /**
             * 重新设置数据结构
             */
            importBuilder.setDataRefactor(new DataRefactor() {
                public void refactor(Context context) throws Exception  {
                    //可以根据条件定义是否丢弃当前记录
                    //context.setDrop(true);return;
                    //				if(s.incrementAndGet() % 2 == 0) {
                    //					context.setDrop(true);
                    //					return;
                    //				}


                    context.addFieldValue("author","duoduo");
                    context.addFieldValue("title","解放");
                    context.addFieldValue("subtitle","小康");
                    context.addFieldValue("collecttime",new Date());//

                    //				context.addIgnoreFieldMapping("title");
                    //上述三个属性已经放置到docInfo中，如果无需再放置到索引文档中，可以忽略掉这些属性
                    //				context.addIgnoreFieldMapping("author");

                    //				//修改字段名称title为新名称newTitle，并且修改字段的值
                    //				context.newName2ndData("title","newTitle",(String)context.getValue("title")+" append new Value");
                    context.addIgnoreFieldMapping("subtitle");
                    /**
                     * 获取ip对应的运营商和区域信息
                     */
                    IpInfo ipInfo = context.getIpInfo("LOG_VISITORIAL");
                    if(ipInfo != null)
                        context.addFieldValue("ipinfo", SimpleStringUtil.object2json(ipInfo));
                    else{
                        context.addFieldValue("ipinfo", "");
                    }
                    DateFormat dateFormat = SerialUtil.getDateFormateMeta().toDateFormat();
                    Date optime = context.getDateValue("LOG_OPERTIME",dateFormat);
                    context.addFieldValue("logOpertime",optime);
                    context.addFieldValue("collecttime",new Date());
                    //				对数据进行格式化
                    //				DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                    //				Date optime = context.getDateValue("LOG_OPERTIME");
                    //
                    //				context.addFieldValue("logOpertime",dateFormat.format(optime));

                    /**
                     //关联查询数据,单值查询
                     Map headdata = SQLExecutor.queryObjectWithDBName(Map.class,context.getEsjdbc().getDbConfig().getDbName(),
                     "select * from head where billid = ? and othercondition= ?",
                     context.getIntegerValue("billid"),"otherconditionvalue");//多个条件用逗号分隔追加
                     //将headdata中的数据,调用addFieldValue方法将数据加入当前es文档，具体如何构建文档数据结构根据需求定
                     context.addFieldValue("headdata",headdata);
                     //关联查询数据,多值查询
                     List<Map> facedatas = SQLExecutor.queryListWithDBName(Map.class,context.getEsjdbc().getDbConfig().getDbName(),
                     "select * from facedata where billid = ?",
                     context.getIntegerValue("billid"));
                     //将facedatas中的数据,调用addFieldValue方法将数据加入当前es文档，具体如何构建文档数据结构根据需求定
                     context.addFieldValue("facedatas",facedatas);
                     */
                }
            });
            //映射和转换配置结束
            /**
             * 内置线程池配置，实现多线程并行数据导入功能，作业完成退出时自动关闭该线程池
             */
            importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行
            importBuilder.setQueue(10);//设置批量导入线程池等待队列长度
            importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量
            importBuilder.setContinueOnError(true);//任务出现异常，是否继续执行作业：true（默认值）继续执行 false 中断作业执行
            importBuilder.setAsyn(false);//true 异步方式执行，不等待所有导入作业任务结束，方法快速返回；false（默认值） 同步方式执行，等待所有导入作业任务结束，所有作业结束后方法才返回

            importBuilder.setUseLowcase(false)  //可选项，true 列名称转小写，false列名称不转换小写，默认false，只要在UseJavaName为false的情况下，配置才起作用
                    .setPrintTaskLog(true); //可选项，true 打印任务执行日志（耗时，处理记录数） false 不打印，默认值false
            importBuilder.setExportResultHandler(new ExportResultHandler<String,String>() {
                @Override
                public void success(TaskCommand<String,String> taskCommand, String result) {
                    TaskMetrics taskMetrics = taskCommand.getTaskMetrics();
                    logger.info(taskMetrics.toString());
                }

                @Override
                public void error(TaskCommand<String,String> taskCommand, String result) {
                    TaskMetrics taskMetrics = taskCommand.getTaskMetrics();
                    logger.info(taskMetrics.toString());
                }

                @Override
                public void exception(TaskCommand<String,String> taskCommand, Exception exception) {
                    TaskMetrics taskMetrics = taskCommand.getTaskMetrics();
                    logger.info(taskMetrics.toString());
                }

                @Override
                public int getMaxRetry() {
                    return 0;
                }
            });
           return importBuilder;
        });
    }

}
